package com.shujia.flink.tf

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object Demo7Window {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))

    /**
      * 每隔5秒统计单词的数量
      *
      */

    kvDS
      .keyBy(_._1)
      .timeWindow(Time.seconds(5)) //滚动窗口
      .sum(1)
      .print()

    env.execute()


  }

}
